Design

Peregrine is designed to use modern hardware and takes into consideration recent developments in Map Reduce design including ideas from Cassandra, FlumeJava, BigTable, MapReduceMerge, and other recent innovations.

Partitioning and Iterative Jobs

Peregrine was designed primary for iterative jobs where job S is apriori setup to join against job S+1.

We accomplish this by partitining the data into determinstic locations so that a range of keys (the partition) always exist on the same machine.

This enables efficient merging of data from the previous iteration.

Partition Layout

GFS and HDFS both store data in blocks of about 64-256MB. These blocks are evenly distributed across the cluster which enables parallel recovery if an individual machine fails.

Many MapReduce clusters often host a large amount of data per box. I naive partition layout may try to keep all of the data on the same groups of physical machines (replicas).

However, 1TB of data will not replicate quickly with only a few copies of the data would mean that it could take hours to get back up to the minimum number of replicas.

Peregrine can bypass this problem by using a clever partitioning layout.

Partitions are first given out in terms of priority groups. A host will have nr_replica priority groups. Maps and reduce jobs only execute with the concurrency of nr_partitions_per_host / nr_replica .

The layout strategy is designed to maximize recovery so that when a host fails it can recover from nr_partitions_per_host hosts (one for each partition).

Figure 1 Proper initial partition layout.

The highlighted regions show how each priority group is offset and cycled between horizontal and diagonal placement. This setup allows us to shift the partitions to maximize recovery throughput.

Figure 2 Layout after machine0:11111 has been marked failed.

In this configuration machines the highlighted partitions would serve as replicas during recovery.. In normal production environments the number of partitions per host would be in the 10-25 range which would mean that each host would handle 4-10% additional load after taking additional partitions.

It's also possible to have dedicated hosts setup just for failure operation which have fewer partitions than the rest of the machines in the cluster.

Multicore and Concurrency

Peregrine is designed to function on cheap commodity hardware. A typical configuration will use inexpensive 1TB SATA and 8-16 cores per box with 8-12GB of RAM.

One difficult issue is that a machine with 4 SATA drives and 16 cores is almost equivalent to 4 1x SATA and 4 core machines.

Getting Peregrine to correctly partition the data across the 4 SATA drives and maintain decent performance is accomplished by running 4 daemons per box.

Each daemon has a unique port and PFS mount point. The first daemon could run on port 11111 and have a mount point of /d2/peregrine-fs. The second would run on port 11112 and have a mount point of /d3/peregrine-fs, and so on.

This enables us to efficiently use the hardware and specifically the SATA disk but also maximize the CPU concurrency of the machine by utilizing all cores.

Distributed Filesystem

All data is stored in a filesystem named PFS (Partitioned File System).

PFS is a simple distributed filesystem meant to meet the needs of just our specific use case.

All files are 'sequence files' or 'key/value' files with the data routed by the key.

Files are spread across chunks, by default the maximum chunk size is 128MB.

A file is a collection of these individual chunks.

Chunks are written on key/value boundaries so a given map() can just work directly with an individual chunk without having to do any split on the given file.

These files are split up across the underlying partitions by key with a key routing function.

On disk the files are stored in a path hierarchy given by the user.

For example, if a user writes to /tmp/extract-output.dat we will write this file into the PFS root directory on all nodes first routed by key and then stored into individual chunks.

Write Pipeline

All writes are done via HTTP PUTs to the master for a specific partition. Writes are then pipelined to each replica one by one until all replicas have a copy of the data.

The write pipeline is specified by the X-pipeline HTTP header.

We use HTTP chunked encoding to write data in atomic units. When a PFS node receives the first chunk, it opens an HTTP connection to the first machine specified in X-pipeline and includes a new X-pipeline header specifying the remaining machines (if any) in the write pipeline.

The data then flows through all nodes one by one. This enables us to get the full gigabit ethernet performance out of a given host when doing a write to N hosts intead of just 1/N.

Job Pipelines

Many iterative jobs are built from mutli-stage job pipelines (not to be confused with PFS replication write pipelines).

A system not aware of these job pipelines may simply write the data to a filesystem, then map over the filesystem.

Traditionally this is done for reasons of simplicity but this intermediate IO can have a negative performance impact if we can perform the same recovery but without the unnecessary intermediate data.

Peregrine was designed to factor in job pipeline into the core architecture to prevent this from being a performance bottleneck.

For Extract Transform and Load (ETL) jobs, often the intermediate data is always discarded.

The only intermediate IO that we persist are "shuffle groups" which are logs storing shuffle output from map tasks.

These files perform the core of our checkpointing functionality.

We use a large buffer (128MB-1GB) so that reducer nodes responsible for their data keep similar map output in a contiguous region on disk.

Partition and Shuffle Group Performance

Peregrine is a partioned system we store a number of partitions per node. Usually in the 10-25 range. While this allows for better throughput during node failure it means there is a performance impact.

Traditional SATA HDDs can not perform fast random IO. The best peak performance can be achieved by having 1 sequential reader or writer using the drive at a time.

However, the impact isn't major if you can keep the number of parallel tasks to a minum.

This is a benchmark of shuffle group performance on a number of partitions and buffer sizes.

Shuffle groups operate by keeping an in-memory buffer of currently streaming IO and then flushing it to disk when it is complete.

This leaves large regions of the disk available for contiguous reads.

When we do a reduce on a given node it is on one given partition at a time. This allow us to take a small hit in terms of performance during reduces which enables us to have massive recovery parallelism during node failures.

Shuffle IO throughput optimization

The original MapReduce paper and hadoop implement shuffling by first writing map() output to the local disk and then when this is complete, transferring the data to the target node.

Google's implementation wrote the data into N files on disk on the machine running the map() and then transferred the smaller chunks to the machines responsible for performing the reduction.

The problem with this approach is that it will require local disk IO to perform the local write.

"Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers."

"When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all in- termediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. "

It's unclear whether Google was buffering this data in memory or they were saying it was 'buffered' because it wan't yet sent to the reducer node.

Peregrine does a full bypass of the local disk and instead writes the data directly to the remote node.

This provides following benefits:

This allows us to do only one read of the data and one write. Versus two reads and two writes with the local intermediate file approach.

Failure during Map

Failure of a node during a map makes it difficult for us to write shuffle data because a remote node would then have partial map output and then a secondary node would come online and finish the map.

The controller tracks this by keeping a log of completed maps and transfers this to reducers. When a reduce starts the reducer simply skips map output which is failed.

In a long running map job in a reasonable network configuration we expect to have say 10 machines fail in a cluster of 1000. In this situation the maximum excess shuffle data we should have would be 10 x chunk_size or approximately 1.2GB.

Failure detection

All machines in Peregrine are able to send gossip back to the controller. This allows the controller to decide about which machines have failed during a computation so that we can take them out of production.

When a machine is marked as failed we perform the following steps:

No new jobs or partitions are assigned to a machine once it has been marked failed.

Memory Allocation

Peregrine is designed to take advantage of as many operating system facilities as possible instead of relying on potential inefficient Java alternatives.

Only core data structures are allocated within the JDK's heap. All other memory (shuffle output buffer, shuffle sorting buffers, and merging buffers) are allocated by direct buffers via mmap(). Where possible we will mlock() these pages into memory to avoid them being swapped to disk during a critical operation.

We also use fadvise to purge pages from the VFS page cache if the files will no longer be used (reducing VFS cache).